// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
 * Use is subject to license terms.
 *
 * Copyright (c) 2011, 2017, Intel Corporation.
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * These are the only exported functions, they provide some generic
 * infrastructure for managing object devices
 */

#define DEBUG_SUBSYSTEM S_CLASS

#include <linux/pid_namespace.h>
#include <linux/workqueue.h>
#include <lustre_compat.h>
#include <cfs_hash.h>
#include <obd_class.h>
#include <lustre_log.h>
#include <lprocfs_status.h>
#include <lustre_disk.h>
#include <lustre_kernelcomm.h>

DEFINE_XARRAY_ALLOC(obd_devs);
EXPORT_SYMBOL(obd_devs);

static atomic_t obd_devs_count = ATOMIC_INIT(0);

static struct kmem_cache *obd_device_cachep;
static struct kobj_type class_ktype;
static struct workqueue_struct *zombie_wq;

static void obd_zombie_export_add(struct obd_export *exp);
static void obd_zombie_import_add(struct obd_import *imp);
static void print_export_data(struct obd_export *exp,
			      const char *status, int locks, int debug_level);

static LIST_HEAD(obd_stale_exports);
static DEFINE_SPINLOCK(obd_stale_export_lock);
static atomic_t obd_stale_export_num = ATOMIC_INIT(0);

static struct obd_device *obd_device_alloc(void)
{
	struct obd_device *obd;

	OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
	if (obd != NULL)
		obd->obd_magic = OBD_DEVICE_MAGIC;
	return obd;
}

static void obd_device_free(struct obd_device *obd)
{
	LASSERT(obd != NULL);
	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
		 "obd %px obd_magic %08x != %08x\n",
		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
	LASSERTF(obd->obd_namespace == NULL,
		 "obd %px: namespace %px was not properly cleaned up (obd_force=%d)!\n",
		 obd, obd->obd_namespace, obd->obd_force);
	OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
}

struct obd_type *class_search_type(const char *name)
{
	struct kobject *kobj = kset_find_obj(lustre_kset, name);

	if (kobj && kobj->ktype == &class_ktype)
		return container_of(kobj, struct obd_type, typ_kobj);

	kobject_put(kobj);
	return NULL;
}
EXPORT_SYMBOL(class_search_type);

SERVER_ONLY struct obd_type *class_get_type(const char *name)
{
	struct obd_type *type;

	rcu_read_lock();
	type = class_search_type(name);
#ifdef HAVE_MODULE_LOADING_SUPPORT
	if (!type) {
		const char *modname = name;

#ifdef HAVE_SERVER_SUPPORT
		if (strcmp(modname, "obdfilter") == 0 ||
		    strcmp(modname, LUSTRE_OSS_NAME) == 0)
			modname = "ofd";

		if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
			modname = LUSTRE_OSP_NAME;

		if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
			modname = LUSTRE_MDT_NAME;
#endif /* HAVE_SERVER_SUPPORT */

		rcu_read_unlock();
		if (!request_module("%s", modname)) {
			CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
		} else {
			LCONSOLE_ERROR("Can't load module '%s'\n", modname);
		}
		rcu_read_lock();
		type = class_search_type(name);
	}
#endif
	if (type) {
		/*
		 * Holding rcu_read_lock() matches the synchronize_rcu() call
		 * in free_module() and ensures that if type->typ_dt_ops is
		 * not yet NULL, then the module won't be freed until after
		 * we rcu_read_unlock().
		 */
		const struct obd_ops *dt_ops = READ_ONCE(type->typ_dt_ops);

		if (dt_ops && try_module_get(dt_ops->o_owner)) {
			atomic_inc(&type->typ_refcnt);
			/* class_search_type() returned a counted ref, this
			 * count not needed as we could get it via typ_refcnt
			 */
			kobject_put(&type->typ_kobj);
		} else {
			kobject_put(&type->typ_kobj);
			type = NULL;
		}
	}
	rcu_read_unlock();
	return type;
}
SERVER_ONLY_EXPORT_SYMBOL(class_get_type);

void class_put_type(struct obd_type *type)
{
	LASSERT(type);
	module_put(type->typ_dt_ops->o_owner);
	atomic_dec(&type->typ_refcnt);
}
EXPORT_SYMBOL(class_put_type);

static void class_sysfs_release(struct kobject *kobj)
{
	struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);

	debugfs_remove_recursive(type->typ_debugfs_entry);
	type->typ_debugfs_entry = NULL;

	if (type->typ_lu)
		lu_device_type_fini(type->typ_lu);

#ifdef CONFIG_PROC_FS
	if (type->typ_name && type->typ_procroot)
		remove_proc_subtree(type->typ_name, proc_lustre_root);
#endif
	OBD_FREE(type, sizeof(*type));
}

static struct kobj_type class_ktype = {
	.sysfs_ops      = &lustre_sysfs_ops,
	.release        = class_sysfs_release,
};

#ifdef HAVE_SERVER_SUPPORT
struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
{
	struct dentry *symlink;
	struct obd_type *type;
	int rc;

	type = class_search_type(name);
	if (type) {
		kobject_put(&type->typ_kobj);
		return ERR_PTR(-EEXIST);
	}

	OBD_ALLOC(type, sizeof(*type));
	if (!type)
		return ERR_PTR(-ENOMEM);

	type->typ_kobj.kset = lustre_kset;
	rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
				  &lustre_kset->kobj, "%s", name);
	if (rc)
		return ERR_PTR(rc);

	symlink = debugfs_create_dir(name, debugfs_lustre_root);
	type->typ_debugfs_entry = symlink;
	type->typ_sym_filter = true;

	if (enable_proc) {
		type->typ_procroot = lprocfs_register(name, proc_lustre_root,
						      NULL, NULL);
		if (IS_ERR(type->typ_procroot)) {
			CERROR("%s: can't create compat proc entry: %d\n",
			       name, (int)PTR_ERR(type->typ_procroot));
			type->typ_procroot = NULL;
		}
	}

	return type;
}
EXPORT_SYMBOL(class_add_symlinks);
#endif /* HAVE_SERVER_SUPPORT */

#define CLASS_MAX_NAME 1024

int class_register_type(const struct obd_ops *dt_ops,
			const struct md_ops *md_ops,
			bool enable_proc,
			const char *name, struct lu_device_type *ldt)
{
	struct obd_type *type;
	int rc;

	ENTRY;
	/* sanity check */
	LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);

	type = class_search_type(name);
	if (type) {
#ifdef HAVE_SERVER_SUPPORT
		if (type->typ_sym_filter)
			goto dir_exist;
#endif /* HAVE_SERVER_SUPPORT */
		kobject_put(&type->typ_kobj);
		CDEBUG(D_IOCTL, "Type %s already registered\n", name);
		RETURN(-EEXIST);
	}

	OBD_ALLOC(type, sizeof(*type));
	if (type == NULL)
		RETURN(-ENOMEM);

	type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
	type->typ_kobj.kset = lustre_kset;
	kobject_init(&type->typ_kobj, &class_ktype);
#ifdef HAVE_SERVER_SUPPORT
dir_exist:
#endif /* HAVE_SERVER_SUPPORT */

	type->typ_dt_ops = dt_ops;
	type->typ_md_ops = md_ops;

#ifdef HAVE_SERVER_SUPPORT
	if (type->typ_sym_filter) {
		type->typ_sym_filter = false;
		kobject_put(&type->typ_kobj);
		goto setup_ldt;
	}
#endif
#ifdef CONFIG_PROC_FS
	if (enable_proc && !type->typ_procroot) {
		type->typ_procroot = lprocfs_register(name,
						      proc_lustre_root,
						      NULL, type);
		if (IS_ERR(type->typ_procroot)) {
			rc = PTR_ERR(type->typ_procroot);
			type->typ_procroot = NULL;
			GOTO(failed, rc);
		}
	}
#endif
	type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);

	rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
	if (rc)
		GOTO(failed, rc);
#ifdef HAVE_SERVER_SUPPORT
setup_ldt:
#endif
	if (ldt) {
		rc = lu_device_type_init(ldt);
		smp_store_release(&type->typ_lu, rc ? NULL : ldt);
		wake_up_var(&type->typ_lu);
		if (rc)
			GOTO(failed, rc);
	}

	RETURN(0);

failed:
	kobject_put(&type->typ_kobj);

	RETURN(rc);
}
EXPORT_SYMBOL(class_register_type);

int class_unregister_type(const char *name)
{
	struct obd_type *type = class_search_type(name);
	int rc = 0;

	ENTRY;
	if (!type) {
		CERROR("unknown obd type\n");
		RETURN(-EINVAL);
	}

	/*
	 * Ensure that class_get_type doesn't try to get the module
	 * as it could be freed before the obd_type is released.
	 * synchronize_rcu() will be called before the module
	 * is freed.
	 */
	type->typ_dt_ops = NULL;

	if (atomic_read(&type->typ_refcnt)) {
		CERROR("type %s has refcount (%d)\n", name,
		       atomic_read(&type->typ_refcnt));
		/* This is a bad situation, let's make the best of it */
		/* Remove ops, but leave the name for debugging */
		type->typ_md_ops = NULL;
		GOTO(out_put, rc = -EBUSY);
	}

	/* Put the final ref */
	kobject_put(&type->typ_kobj);
out_put:
	/* Put the ref returned by class_search_type() */
	kobject_put(&type->typ_kobj);

	RETURN(rc);
} /* class_unregister_type */
EXPORT_SYMBOL(class_unregister_type);

/**
 * Create a new obd device.
 *
 * Allocate the new obd_device and initialize it.
 *
 * \param[in] type_name obd device type string.
 * \param[in] name      obd device name.
 * \param[in] uuid      obd device UUID
 *
 * \retval newdev         pointer to created obd_device
 * \retval ERR_PTR(errno) on error
 */
struct obd_device *class_newdev(const char *type_name, const char *name,
				const char *uuid)
{
	struct obd_device *newdev;
	struct obd_type *type = NULL;

	ENTRY;

	if (strlen(name) >= MAX_OBD_NAME) {
		CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
		RETURN(ERR_PTR(-EINVAL));
	}

	type = class_get_type(type_name);
	if (type == NULL) {
		CERROR("OBD: unknown type: %s\n", type_name);
		RETURN(ERR_PTR(-ENODEV));
	}

	newdev = obd_device_alloc();
	if (newdev == NULL) {
		class_put_type(type);
		RETURN(ERR_PTR(-ENOMEM));
	}
	LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
	strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
	newdev->obd_type = type;
	newdev->obd_minor = -1;

	rwlock_init(&newdev->obd_pool_lock);
	newdev->obd_pool_limit = 0;
	newdev->obd_pool_slv = 0;

	INIT_LIST_HEAD(&newdev->obd_exports);
	newdev->obd_num_exports = 0;
	newdev->obd_grant_check_threshold = 100;
	INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
	INIT_LIST_HEAD(&newdev->obd_delayed_exports);
	newdev->obd_exports_timed.rb_node = NULL;
	INIT_LIST_HEAD(&newdev->obd_nid_stats);
	spin_lock_init(&newdev->obd_nid_lock);
	spin_lock_init(&newdev->obd_dev_lock);
	mutex_init(&newdev->obd_dev_mutex);
	spin_lock_init(&newdev->obd_osfs_lock);
	/* newdev->obd_osfs_age must be set to a value in the distant
	 * past to guarantee a fresh statfs is fetched on mount.
	 */
	newdev->obd_osfs_age = ktime_get_seconds() - 1000;

	/* XXX belongs in setup not attach  */
	init_rwsem(&newdev->obd_observer_link_sem);
	/* recovery data */
	spin_lock_init(&newdev->obd_recovery_task_lock);
	init_waitqueue_head(&newdev->obd_next_transno_waitq);
	INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
	INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
	INIT_LIST_HEAD(&newdev->obd_final_req_queue);
	INIT_LIST_HEAD(&newdev->obd_evict_list);
	INIT_LIST_HEAD(&newdev->obd_lwp_list);

	llog_group_init(&newdev->obd_olg);
	/* Detach drops this */
	kref_init(&newdev->obd_refcount);

	atomic_set(&newdev->obd_conn_inprogress, 0);

	strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);

	CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
	       newdev->obd_name, newdev);

	return newdev;
}

/**
 * Free obd device.
 *
 * \param[in] obd obd_device to be freed
 *
 * \retval none
 */
void class_free_dev(struct obd_device *obd)
{
	struct obd_type *obd_type = obd->obd_type;

	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
		 "%px obd_magic %08x != %08x\n",
		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
	LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
		 "obd %px != obd_devs[%d] %px\n",
		 obd, obd->obd_minor, class_num2obd(obd->obd_minor));
	LASSERTF(kref_read(&obd->obd_refcount) == 0,
		 "obd_refcount should be 0, not %d\n",
		 kref_read(&obd->obd_refcount));
	LASSERT(obd_type != NULL);

	CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
	       obd->obd_name, obd->obd_type->typ_name);

	CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
			 obd->obd_name, obd->obd_uuid.uuid);
	if (obd->obd_stopping) {
		int err;

		/* If we're not stopping, we were never set up */
		err = obd_cleanup(obd);
		if (err)
			CERROR("Cleanup %s returned %d\n",
				obd->obd_name, err);
	}

	obd_device_free(obd);

	class_put_type(obd_type);
}

static int class_name2dev_nolock(const char *name)
{
	struct obd_device *obd = NULL;
	unsigned long dev_no = 0;
	int ret;

	if (!name)
		return -1;

	obd_device_for_each(dev_no, obd) {
		if (strcmp(name, obd->obd_name) == 0) {
			/*
			 * Make sure we finished attaching before we give
			 * out any references
			 */
			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
			if (test_bit(OBDF_ATTACHED, obd->obd_flags)) {
				ret = obd->obd_minor;
				return ret;
			}
			break;
		}
	}

	return -1;
}

int class_name2dev(const char *name)
{
	int ret;

	obd_device_lock();
	ret = class_name2dev_nolock(name);
	obd_device_unlock();

	return ret;
}
EXPORT_SYMBOL(class_name2dev);

/**
 * Unregister obd device.
 *
 * Remove an obd from obd_dev
 *
 * \param[in] new_obd obd_device to be unregistered
 *
 * \retval none
 */
void class_unregister_device(struct obd_device *obd)
{
	obd_device_lock();
	if (obd->obd_minor >= 0) {
		__xa_erase(&obd_devs, obd->obd_minor);
		class_decref(obd, "obd_device_list", obd);
		obd->obd_minor = -1;
		atomic_dec(&obd_devs_count);
	}
	obd_device_unlock();
}

/**
 * Register obd device.
 *
 * Add new_obd to obd_devs
 *
 * \param[in] new_obd obd_device to be registered
 *
 * \retval 0          success
 * \retval -EEXIST    device with this name is registered
 */
int class_register_device(struct obd_device *new_obd)
{
	int rc = 0;
	int dev_no = 0;

	if (new_obd == NULL) {
		rc = -1;
		goto out;
	}

	/* obd_device waiting to be destroyed by "obd_zombie_impexp_thread" */
	if (class_name2dev(new_obd->obd_name) != -1)
		obd_zombie_barrier();

	obd_device_lock();
	if (class_name2dev_nolock(new_obd->obd_name) == -1) {
		class_incref(new_obd, "obd_device_list", new_obd);
		rc = __xa_alloc(&obd_devs, &dev_no, new_obd,
				xa_limit_31b, GFP_ATOMIC);

		if (rc != 0)
			goto out;

		new_obd->obd_minor = dev_no;
		atomic_inc(&obd_devs_count);
	} else {
		rc = -EEXIST;
	}

out:
	obd_device_unlock();
	RETURN(rc);
}

struct obd_device *class_name2obd(const char *name)
{
	struct obd_device *obd = NULL;
	unsigned long dev_no = 0;

	if (!name)
		return NULL;

	obd_device_lock();
	obd_device_for_each(dev_no, obd) {
		if (strcmp(name, obd->obd_name) == 0) {
			/*
			 * Make sure we finished attaching before we give
			 * out any references
			 */
			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
			if (test_bit(OBDF_ATTACHED, obd->obd_flags))
				break;
		}
	}
	obd_device_unlock();

	/*
	 * TODO: We give out a reference without class_incref(). This isn't
	 * ideal, but this behavior is identical in previous implementations
	 * of this function.
	 */
	return obd;
}
EXPORT_SYMBOL(class_name2obd);

int class_uuid2dev(struct obd_uuid *uuid)
{
	struct obd_device *obd = NULL;
	unsigned long dev_no = 0;
	int ret;

	obd_device_lock();
	obd_device_for_each(dev_no, obd) {
		if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
			ret = obd->obd_minor;
			obd_device_unlock();
			return ret;
		}
	}
	obd_device_unlock();

	return -1;
}
EXPORT_SYMBOL(class_uuid2dev);

struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
{
	struct obd_device *obd = NULL;
	unsigned long dev_no = 0;

	obd_device_lock();
	obd_device_for_each(dev_no, obd) {
		if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
			break;
		}
	}
	obd_device_unlock();

	/*
	 * TODO: We give out a reference without class_incref(). This isn't
	 * ideal, but this behavior is identical in previous implementations
	 * of this function.
	 */
	return obd;
}
EXPORT_SYMBOL(class_uuid2obd);

struct obd_device *class_num2obd(int dev_no)
{
	return xa_load(&obd_devs, dev_no);
}
EXPORT_SYMBOL(class_num2obd);

/**
 * Find obd by name or uuid.
 *
 * Increment obd's refcount if found.
 *
 * \param[in] str obd name or uuid
 *
 * \retval NULL    if not found
 * \retval obd     pointer to found obd_device
 */
struct obd_device *class_str2obd(const char *str)
{
	struct obd_device *obd = NULL;
	struct obd_uuid uuid;
	unsigned long dev_no = 0;

	obd_str2uuid(&uuid, str);

	obd_device_lock();
	obd_device_for_each(dev_no, obd) {
		if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
		    (strcmp(str, obd->obd_name) == 0)) {
			/*
			 * Make sure we finished attaching before we give
			 * out any references
			 */
			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
			if (test_bit(OBDF_ATTACHED, obd->obd_flags)) {
				class_incref(obd, "find", current);
				break;
			}
			obd_device_unlock();
			RETURN(NULL);
		}
	}
	obd_device_unlock();

	RETURN(obd);
}
EXPORT_SYMBOL(class_str2obd);

/**
 * Get obd devices count. Device in any
 *    state are counted
 * \retval obd device count
 */
int class_obd_devs_count(void)
{
	return atomic_read(&obd_devs_count);
}
EXPORT_SYMBOL(class_obd_devs_count);

/* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
 * specified, then only the client with that uuid is returned,
 * otherwise any client connected to the tgt is returned.
 */
struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
					 const char *type_name,
					 struct obd_uuid *grp_uuid)
{
	struct obd_device *obd = NULL;
	unsigned long dev_no = 0;

	obd_device_lock();
	obd_device_for_each(dev_no, obd) {
		if ((strncmp(obd->obd_type->typ_name, type_name,
			     strlen(type_name)) == 0)) {
			if (obd_uuid_equals(tgt_uuid,
					    &obd->u.cli.cl_target_uuid) &&
			    ((grp_uuid) ? obd_uuid_equals(grp_uuid,
							 &obd->obd_uuid) : 1)) {
				obd_device_unlock();
				return obd;
			}
		}
	}
	obd_device_unlock();

	return NULL;
}
EXPORT_SYMBOL(class_find_client_obd);

/**
 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
 * adjust sptlrpc settings accordingly.
 */
int class_notify_sptlrpc_conf(const char *fsname, int namelen)
{
	struct obd_device *obd = NULL;
	unsigned long dev_no = 0;
	const char *type;
	int rc = 0, rc2;

	LASSERT(namelen > 0);

	obd_device_lock();
	obd_device_for_each(dev_no, obd) {
		if (!test_bit(OBDF_SET_UP, obd->obd_flags) || obd->obd_stopping)
			continue;

		/* only notify mdc, osc, osp, lwp, mdt, ost
		 * because only these have a -sptlrpc llog
		 */
		type = obd->obd_type->typ_name;
		if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
		    strcmp(type, LUSTRE_OSC_NAME) != 0 &&
		    strcmp(type, LUSTRE_OSP_NAME) != 0 &&
		    strcmp(type, LUSTRE_LWP_NAME) != 0 &&
		    strcmp(type, LUSTRE_MDT_NAME) != 0 &&
		    strcmp(type, LUSTRE_OST_NAME) != 0)
			continue;

		if (strncmp(obd->obd_name, fsname, namelen))
			continue;

		class_incref(obd, __func__, obd);
		obd_device_unlock();
		rc2 = obd_set_info_async(NULL, obd->obd_self_export,
					 sizeof(KEY_SPTLRPC_CONF),
					 KEY_SPTLRPC_CONF, 0, NULL, NULL);
		rc = rc ? rc : rc2;
		obd_device_lock();
		class_decref(obd, __func__, obd);
	}
	obd_device_unlock();

	return rc;
}
EXPORT_SYMBOL(class_notify_sptlrpc_conf);

void obd_cleanup_caches(void)
{
	ENTRY;
	if (obd_device_cachep) {
		kmem_cache_destroy(obd_device_cachep);
		obd_device_cachep = NULL;
	}

	EXIT;
}

int obd_init_caches(void)
{
	int rc;

	ENTRY;

	LASSERT(obd_device_cachep == NULL);
	obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
				sizeof(struct obd_device),
				0, 0, 0, sizeof(struct obd_device), NULL);
	if (!obd_device_cachep)
		GOTO(out, rc = -ENOMEM);

	RETURN(0);
out:
	obd_cleanup_caches();
	RETURN(rc);
}

static const char export_handle_owner[] = "export";

/* map connection to client */
struct obd_export *class_conn2export(struct lustre_handle *conn)
{
	struct obd_export *export;

	ENTRY;

	if (!conn) {
		CDEBUG(D_CACHE, "looking for null handle\n");
		RETURN(NULL);
	}

	if (conn->cookie == -1) {  /* this means assign a new connection */
		CDEBUG(D_CACHE, "want a new connection\n");
		RETURN(NULL);
	}

	CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
	export = class_handle2object(conn->cookie, export_handle_owner);
	RETURN(export);
}
EXPORT_SYMBOL(class_conn2export);

struct obd_device *class_exp2obd(struct obd_export *exp)
{
	if (exp)
		return exp->exp_obd;
	return NULL;
}
EXPORT_SYMBOL(class_exp2obd);

struct obd_import *class_exp2cliimp(struct obd_export *exp)
{
	struct obd_device *obd;

	if (!exp || !exp->exp_obd)
		goto out;

	obd = exp->exp_obd;

	return obd->u.cli.cl_import;
out:
	return NULL;
}
EXPORT_SYMBOL(class_exp2cliimp);

/* Export management functions */
static void class_export_destroy(struct obd_export *exp)
{
	struct obd_device *obd = exp->exp_obd;

	ENTRY;

	LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
	LASSERT(obd != NULL);

	CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
	       exp->exp_client_uuid.uuid, obd->obd_name);

	/* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
	ptlrpc_connection_put(exp->exp_connection);

	LASSERT(list_empty(&exp->exp_outstanding_replies));
	LASSERT(list_empty(&exp->exp_uncommitted_replies));
	LASSERT(list_empty(&exp->exp_req_replay_queue));
	LASSERT(list_empty(&exp->exp_hp_rpcs));
	obd_destroy_export(exp);
	/* self export doesn't hold a reference to an obd, although it
	 * exists until freeing of the obd
	 */
	if (exp != obd->obd_self_export)
		class_decref(obd, "export", exp);

	OBD_FREE_RCU(exp, sizeof(*exp), exp_handle.h_rcu);
	EXIT;
}

struct obd_export *class_export_get(struct obd_export *exp)
{
	refcount_inc(&exp->exp_handle.h_ref);
	CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
	       refcount_read(&exp->exp_handle.h_ref));
	return exp;
}
EXPORT_SYMBOL(class_export_get);

void class_export_put(struct obd_export *exp)
{
	LASSERT(exp != NULL);
	LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
	LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
	CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
	       refcount_read(&exp->exp_handle.h_ref) - 1);

	if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
		struct obd_device *obd = exp->exp_obd;

		CDEBUG(D_IOCTL, "final put %p/%s\n",
		       exp, exp->exp_client_uuid.uuid);

		/* release nid stat refererence */
		lprocfs_exp_cleanup(exp);

		if (exp == obd->obd_self_export) {
			/* self export should be destroyed without zombie
			 * thread as it doesn't hold a reference to obd and
			 * doesn't hold any resources
			 */
			class_export_destroy(exp);
			/* self export is destroyed, no class ref exist and it
			 * is safe to free obd
			 */
			class_free_dev(obd);
		} else {
			LASSERT(!list_empty(&exp->exp_obd_chain));
			obd_zombie_export_add(exp);
		}

	}
}
EXPORT_SYMBOL(class_export_put);

static void obd_zombie_exp_cull(struct work_struct *ws)
{
	struct obd_export *export;

	export = container_of(ws, struct obd_export, exp_zombie_work);
	class_export_destroy(export);
	LASSERT(atomic_read(&obd_stale_export_num) > 0);
	if (atomic_dec_and_test(&obd_stale_export_num))
		wake_up_var(&obd_stale_export_num);
}

/* Creates a new export, adds it to the hash table, and returns a
 * pointer to it. The refcount is 2: one for the hash reference, and
 * one for the pointer returned by this function.
 */
static struct obd_export *__class_new_export(struct obd_device *obd,
					     struct obd_uuid *cluuid,
					     bool is_self)
{
	struct obd_export *export;
	int rc = 0;

	ENTRY;

	OBD_ALLOC_PTR(export);
	if (!export)
		return ERR_PTR(-ENOMEM);

	export->exp_conn_cnt = 0;
	export->exp_lock_hash = NULL;
	export->exp_flock_hash = NULL;
	/* 2 = class_handle_hash + last */
	refcount_set(&export->exp_handle.h_ref, 2);
	atomic_set(&export->exp_rpc_count, 0);
	atomic_set(&export->exp_cb_count, 0);
	atomic_set(&export->exp_locks_count, 0);
#if LUSTRE_TRACKS_LOCK_EXP_REFS
	INIT_LIST_HEAD(&export->exp_locks_list);
	spin_lock_init(&export->exp_locks_list_guard);
#endif
	atomic_set(&export->exp_replay_count, 0);
	export->exp_obd = obd;
	INIT_LIST_HEAD(&export->exp_outstanding_replies);
	spin_lock_init(&export->exp_uncommitted_replies_lock);
	INIT_LIST_HEAD(&export->exp_uncommitted_replies);
	INIT_LIST_HEAD(&export->exp_req_replay_queue);
	INIT_HLIST_NODE(&export->exp_handle.h_link);
	INIT_LIST_HEAD(&export->exp_hp_rpcs);
	INIT_LIST_HEAD(&export->exp_reg_rpcs);
	class_handle_hash(&export->exp_handle, export_handle_owner);
	export->exp_last_request_time = ktime_get_real_seconds();
	spin_lock_init(&export->exp_lock);
	spin_lock_init(&export->exp_rpc_lock);
	INIT_HLIST_NODE(&export->exp_gen_hash);
	spin_lock_init(&export->exp_bl_list_lock);
	INIT_LIST_HEAD(&export->exp_bl_list);
	INIT_LIST_HEAD(&export->exp_stale_list);
	INIT_LIST_HEAD(&export->exp_timed_chain);
	INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);

	export->exp_sp_peer = LUSTRE_SP_ANY;
	export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
	export->exp_client_uuid = *cluuid;
	obd_init_export(export);

	at_init(&export->exp_bl_lock_at, obd_timeout, 0);
	export->exp_root_fid.f_seq = 0;
	export->exp_root_fid.f_oid = 0;
	export->exp_root_fid.f_ver = 0;

	spin_lock(&obd->obd_dev_lock);
	if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
		/* shouldn't happen, but might race */
		if (obd->obd_stopping)
			GOTO(exit_unlock, rc = -ENODEV);

		rc = obd_uuid_add(obd, export);
		if (rc != 0) {
			LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
				      obd->obd_name, cluuid->uuid, rc);
			GOTO(exit_unlock, rc = -EALREADY);
		}
	}

	if (!is_self) {
		class_incref(obd, "export", export);
		list_add(&export->exp_obd_chain, &obd->obd_exports);
		obd->obd_num_exports++;
	} else {
		INIT_LIST_HEAD(&export->exp_obd_chain);
	}
	spin_unlock(&obd->obd_dev_lock);
	RETURN(export);

exit_unlock:
	spin_unlock(&obd->obd_dev_lock);
	class_handle_unhash(&export->exp_handle);
	obd_destroy_export(export);
	OBD_FREE_PTR(export);
	return ERR_PTR(rc);
}

struct obd_export *class_new_export(struct obd_device *obd,
				    struct obd_uuid *uuid)
{
	return __class_new_export(obd, uuid, false);
}
EXPORT_SYMBOL(class_new_export);

struct obd_export *class_new_export_self(struct obd_device *obd,
					 struct obd_uuid *uuid)
{
	return __class_new_export(obd, uuid, true);
}

struct rb_node_exp_deadline {
	struct rb_node	  ned_node;
	struct list_head  ned_head;
	time64_t	  ned_deadline;
};

static inline bool ptlrpc_exp_deadline_less(struct rb_node *ln,
					    const struct rb_node *rn)
{
	struct rb_node_exp_deadline *left, *right;

	left = rb_entry(ln, struct rb_node_exp_deadline, ned_node);
	right = rb_entry(rn, struct rb_node_exp_deadline, ned_node);

	return left->ned_deadline < right->ned_deadline;
}

static inline int ptlrpc_exp_deadline_cmp(const void *key,
					  const struct rb_node *node)
{
	struct rb_node_exp_deadline *ned;
	time64_t *time = (time64_t *)key;

	ned = rb_entry(node, struct rb_node_exp_deadline, ned_node);
	return (*time < ned->ned_deadline ? -1 :
		*time > ned->ned_deadline ?  1 : 0);
}

int obd_export_timed_init(struct obd_export *exp, void **data)

{
	OBD_ALLOC(*data, sizeof(struct rb_node_exp_deadline));
	return data == NULL ? -ENOMEM : 0;
}
EXPORT_SYMBOL(obd_export_timed_init);

void obd_export_timed_fini(struct obd_export *exp, void **data)
{
	if (*data) {
		OBD_FREE(*data, sizeof(struct rb_node_exp_deadline));
		*data = NULL;
	}
}
EXPORT_SYMBOL(obd_export_timed_fini);

void obd_export_timed_add(struct obd_export *exp, void **data)
{
	struct rb_node_exp_deadline *ned = *data;
	struct rb_node *node;

	node = rb_find(&exp->exp_deadline, &exp->exp_obd->obd_exports_timed,
		       ptlrpc_exp_deadline_cmp);

	if (node == NULL) {
		LASSERT(ned != NULL);
		INIT_LIST_HEAD(&ned->ned_head);
		RB_CLEAR_NODE(&ned->ned_node);
		ned->ned_deadline = exp->exp_deadline;
		*data = NULL;

		rb_add(&ned->ned_node, &exp->exp_obd->obd_exports_timed,
		       ptlrpc_exp_deadline_less);
	} else {
		ned = rb_entry(node, struct rb_node_exp_deadline, ned_node);
		LASSERT(!list_empty(&ned->ned_head));
	}

	list_add_tail(&exp->exp_timed_chain, &ned->ned_head);
}
EXPORT_SYMBOL(obd_export_timed_add);

void obd_export_timed_del(struct obd_export *exp)
{
	struct rb_node_exp_deadline *ned;

	if (list_empty(&exp->exp_timed_chain))
		return;

	ned = rb_entry(rb_find(&exp->exp_deadline,
			       &exp->exp_obd->obd_exports_timed,
			       ptlrpc_exp_deadline_cmp),
		       struct rb_node_exp_deadline, ned_node);
	LASSERT(!list_empty(&ned->ned_head));
	LASSERT(ned->ned_deadline == exp->exp_deadline);
	list_del_init(&exp->exp_timed_chain);

	if (list_empty(&ned->ned_head)) {
		rb_erase(&ned->ned_node, &exp->exp_obd->obd_exports_timed);
		OBD_FREE_PTR(ned);
	}
}
EXPORT_SYMBOL(obd_export_timed_del);

struct obd_export *obd_export_timed_get(struct obd_device *obd, bool last)
{
	struct rb_node_exp_deadline *ned;
	struct rb_node *node;

	node = last ? rb_last(&obd->obd_exports_timed) :
		rb_first(&obd->obd_exports_timed);

	if (node == NULL)
		return NULL;

	ned = rb_entry(node, struct rb_node_exp_deadline, ned_node);
	LASSERT(!list_empty(&ned->ned_head));

	return list_first_entry(&ned->ned_head, struct obd_export,
				exp_timed_chain);
}
EXPORT_SYMBOL(obd_export_timed_get);

void class_unlink_export(struct obd_export *exp)
{
	class_handle_unhash(&exp->exp_handle);

	if (exp->exp_obd->obd_self_export == exp) {
		class_export_put(exp);
		return;
	}

	spin_lock(&exp->exp_obd->obd_dev_lock);
	/* delete an uuid-export hashitem from hashtables */
	if (exp != exp->exp_obd->obd_self_export)
		obd_uuid_del(exp->exp_obd, exp);

#ifdef HAVE_SERVER_SUPPORT
	if (!hlist_unhashed(&exp->exp_gen_hash)) {
		struct tg_export_data	*ted = &exp->exp_target_data;
		struct cfs_hash		*hash;

		/* Because obd_gen_hash will not be released until
		 * class_cleanup(), so hash should never be NULL here
		 */
		hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
		LASSERT(hash != NULL);
		cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
			     &exp->exp_gen_hash);
		cfs_hash_putref(hash);
	}
#endif /* HAVE_SERVER_SUPPORT */

	list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
	obd_export_timed_del(exp);
	exp->exp_obd->obd_num_exports--;
	spin_unlock(&exp->exp_obd->obd_dev_lock);

	/* A reference is kept by obd_stale_exports list */
	obd_stale_export_put(exp);
}
EXPORT_SYMBOL(class_unlink_export);

/* Import management functions */
static void obd_zombie_import_free(struct obd_import *imp)
{
	struct obd_import_conn *imp_conn;

	ENTRY;
	CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
	       imp->imp_obd->obd_name);

	LASSERT(refcount_read(&imp->imp_refcount) == 0);

	ptlrpc_connection_put(imp->imp_connection);

	while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
						    struct obd_import_conn,
						    oic_item)) != NULL) {
		list_del_init(&imp_conn->oic_item);
		ptlrpc_connection_put(imp_conn->oic_conn);
		OBD_FREE(imp_conn, sizeof(*imp_conn));
	}

	LASSERT(imp->imp_sec == NULL);
	LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
		 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
	class_decref(imp->imp_obd, "import", imp);
	OBD_FREE_PTR(imp);
	EXIT;
}

struct obd_import *class_import_get(struct obd_import *import)
{
	refcount_inc(&import->imp_refcount);
	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
	       refcount_read(&import->imp_refcount),
	       import->imp_obd->obd_name);
	return import;
}
EXPORT_SYMBOL(class_import_get);

void class_import_put(struct obd_import *imp)
{
	ENTRY;

	LASSERT(refcount_read(&imp->imp_refcount) > 0);

	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
	       refcount_read(&imp->imp_refcount) - 1,
	       imp->imp_obd->obd_name);

	if (refcount_dec_and_test(&imp->imp_refcount)) {
		CDEBUG(D_INFO, "final put import %p\n", imp);
		obd_zombie_import_add(imp);
	}

	EXIT;
}
EXPORT_SYMBOL(class_import_put);

static void init_imp_at(struct imp_at *at)
{
	int i;

	at_init(&at->iat_net_latency, 0, 0);
	for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
		/* max service estimates are tracked server side, so dont't
		 * use AT history here, just use the last reported val. (But
		 * keep hist for proc histogram, worst_ever)
		 */
		at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
			AT_FLG_NOHIST);
	}
}

static void obd_zombie_imp_cull(struct work_struct *ws)
{
	struct obd_import *import;

	import = container_of(ws, struct obd_import, imp_zombie_work);
	obd_zombie_import_free(import);
}

struct obd_import *class_new_import(struct obd_device *obd)
{
	struct obd_import *imp;
	struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);

	OBD_ALLOC(imp, sizeof(*imp));
	if (imp == NULL)
		return NULL;

	INIT_LIST_HEAD(&imp->imp_pinger_chain);
	INIT_LIST_HEAD(&imp->imp_replay_list);
	INIT_LIST_HEAD(&imp->imp_sending_list);
	INIT_LIST_HEAD(&imp->imp_delayed_list);
	INIT_LIST_HEAD(&imp->imp_committed_list);
	INIT_LIST_HEAD(&imp->imp_unreplied_list);
	imp->imp_known_replied_xid = 0;
	imp->imp_replay_cursor = &imp->imp_committed_list;
	spin_lock_init(&imp->imp_lock);
	imp->imp_last_success_conn = 0;
	imp->imp_state = LUSTRE_IMP_NEW;
	imp->imp_obd = class_incref(obd, "import", imp);
	rwlock_init(&imp->imp_sec_lock);
	init_waitqueue_head(&imp->imp_recovery_waitq);
	INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);

	if (curr_pid_ns && curr_pid_ns->child_reaper)
		imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
	else
		imp->imp_sec_refpid = 1;

	refcount_set(&imp->imp_refcount, 2);
	atomic_set(&imp->imp_unregistering, 0);
	atomic_set(&imp->imp_reqs, 0);
	atomic_set(&imp->imp_inflight, 0);
	atomic_set(&imp->imp_replay_inflight, 0);
	init_waitqueue_head(&imp->imp_replay_waitq);
	atomic_set(&imp->imp_inval_count, 0);
	atomic_set(&imp->imp_waiting, 0);
	INIT_LIST_HEAD(&imp->imp_conn_list);
	init_imp_at(&imp->imp_at);

	/* the default magic is V2, will be used in connect RPC, and
	 * then adjusted according to the flags in request/reply.
	 */
	imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;

	return imp;
}
EXPORT_SYMBOL(class_new_import);

void class_destroy_import(struct obd_import *import)
{
	LASSERT(import != NULL);
	LASSERT(import != LP_POISON);

	spin_lock(&import->imp_lock);
	import->imp_generation++;
	spin_unlock(&import->imp_lock);
	class_import_put(import);
}
EXPORT_SYMBOL(class_destroy_import);

#if LUSTRE_TRACKS_LOCK_EXP_REFS

void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
{
	spin_lock(&exp->exp_locks_list_guard);

	LASSERT(lock->l_exp_refs_nr >= 0);

	if (lock->l_exp_refs_target != NULL &&
	    lock->l_exp_refs_target != exp) {
		LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
			      exp, lock, lock->l_exp_refs_target);
	}
	if ((lock->l_exp_refs_nr++) == 0) {
		list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
		lock->l_exp_refs_target = exp;
	}
	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
	       lock, exp, lock->l_exp_refs_nr);
	spin_unlock(&exp->exp_locks_list_guard);
}
EXPORT_SYMBOL(__class_export_add_lock_ref);

void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
{
	spin_lock(&exp->exp_locks_list_guard);
	LASSERT(lock->l_exp_refs_nr > 0);
	if (lock->l_exp_refs_target != exp) {
		LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
			      lock, lock->l_exp_refs_target, exp);
	}
	if (-- lock->l_exp_refs_nr == 0) {
		list_del_init(&lock->l_exp_refs_link);
		lock->l_exp_refs_target = NULL;
	}
	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
	       lock, exp, lock->l_exp_refs_nr);
	spin_unlock(&exp->exp_locks_list_guard);
}
EXPORT_SYMBOL(__class_export_del_lock_ref);
#endif

/* A connection defines an export context in which preallocation can be
 * managed. This releases the export pointer reference, and returns the export
 * handle, so the export refcount is 1 when this function returns.
 */
int class_connect(struct lustre_handle *conn, struct obd_device *obd,
		  struct obd_uuid *cluuid)
{
	struct obd_export *export;

	LASSERT(conn != NULL);
	LASSERT(obd != NULL);
	LASSERT(cluuid != NULL);
	ENTRY;

	export = class_new_export(obd, cluuid);
	if (IS_ERR(export))
		RETURN(PTR_ERR(export));

	conn->cookie = export->exp_handle.h_cookie;
	class_export_put(export);

	CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
	       cluuid->uuid, conn->cookie);
	RETURN(0);
}
EXPORT_SYMBOL(class_connect);

/* if export is involved in recovery then clean up related things */
static void class_export_recovery_cleanup(struct obd_export *exp)
{
	struct obd_device *obd = exp->exp_obd;

	spin_lock(&obd->obd_recovery_task_lock);
	if (test_bit(OBDF_RECOVERING, obd->obd_flags)) {
		if (exp->exp_in_recovery) {
			spin_lock(&exp->exp_lock);
			exp->exp_in_recovery = 0;
			spin_unlock(&exp->exp_lock);
			LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
			atomic_dec(&obd->obd_connected_clients);
		}

		/* if called during recovery then should update
		 * obd_stale_clients counter, lightweight exports is not counted
		 */
		if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
			exp->exp_obd->obd_stale_clients++;
	}
	spin_unlock(&obd->obd_recovery_task_lock);

	spin_lock(&exp->exp_lock);
	/** Cleanup req replay fields */
	if (exp->exp_req_replay_needed) {
		exp->exp_req_replay_needed = 0;

		LASSERT(atomic_read(&obd->obd_req_replay_clients));
		atomic_dec(&obd->obd_req_replay_clients);
	}

	/** Cleanup lock replay data */
	if (exp->exp_lock_replay_needed) {
		exp->exp_lock_replay_needed = 0;

		LASSERT(atomic_read(&obd->obd_lock_replay_clients));
		atomic_dec(&obd->obd_lock_replay_clients);
	}
	spin_unlock(&exp->exp_lock);
}

/* This function removes 1-3 references from the export:
 * 1 - for export pointer passed
 * and if disconnect really need
 * 2 - removing from hash
 * 3 - in client_unlink_export
 * The export pointer passed to this function can destroyed
 */
int class_disconnect(struct obd_export *export)
{
	int already_disconnected;

	ENTRY;

	if (export == NULL) {
		CWARN("attempting to free NULL export %p\n", export);
		RETURN(-EINVAL);
	}

	spin_lock(&export->exp_lock);
	already_disconnected = export->exp_disconnected;
	export->exp_disconnected = 1;
#ifdef HAVE_SERVER_SUPPORT
	/*  We hold references of export for uuid hash and nid_hash and export
	 *  link at least. So it is safe to call rh*table_remove_fast in there.
	 */
	obd_nid_del(export->exp_obd, export);
#endif /* HAVE_SERVER_SUPPORT */
	spin_unlock(&export->exp_lock);

	/* class_cleanup(), abort_recovery(), and class_fail_export() all end up
	 * here, and any of them race we shouldn't call extra class_export_puts
	 */
	if (already_disconnected)
		GOTO(no_disconn, already_disconnected);

	CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
	       export->exp_handle.h_cookie);

	class_export_recovery_cleanup(export);
	class_unlink_export(export);
no_disconn:
	class_export_put(export);
	RETURN(0);
}
EXPORT_SYMBOL(class_disconnect);

/* Return non-zero for a fully connected export */
int class_connected_export(struct obd_export *exp)
{
	int connected = 0;

	if (exp) {
		spin_lock(&exp->exp_lock);
		connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
		spin_unlock(&exp->exp_lock);
	}
	return connected;
}
EXPORT_SYMBOL(class_connected_export);

static void class_disconnect_export_list(struct list_head *list,
					 enum obd_option flags)
{
	int rc;
	struct obd_export *exp;

	ENTRY;

	/* It's possible that an export may disconnect itself, but
	 * nothing else will be added to this list.
	 */
	while ((exp = list_first_entry_or_null(list, struct obd_export,
					       exp_obd_chain)) != NULL) {
		/* need for safe call CDEBUG after obd_disconnect */
		class_export_get(exp);

		spin_lock(&exp->exp_lock);
		exp->exp_flags = flags;
		spin_unlock(&exp->exp_lock);

		if (obd_uuid_equals(&exp->exp_client_uuid,
				    &exp->exp_obd->obd_uuid)) {
			CDEBUG(D_HA,
			       "exp %p export uuid == obd uuid, don't discon\n",
			       exp);
			/* Need to delete this now so we don't end up pointing
			 * to work_list later when this export is cleaned up.
			 */
			list_del_init(&exp->exp_obd_chain);
			class_export_put(exp);
			continue;
		}

		class_export_get(exp);
		CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at %lld\n",
		       exp->exp_obd->obd_name, obd_export_nid2str(exp),
		       exp, exp->exp_last_request_time);
		/* release one export reference anyway */
		rc = obd_disconnect(exp);

		CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
		       obd_export_nid2str(exp), exp, rc);
		class_export_put(exp);
	}
	EXIT;
}

void class_disconnect_exports(struct obd_device *obd)
{
	LIST_HEAD(work_list);

	ENTRY;

	/* Move all of the exports from obd_exports to a work list, en masse. */
	spin_lock(&obd->obd_dev_lock);
	list_splice_init(&obd->obd_exports, &work_list);
	list_splice_init(&obd->obd_delayed_exports, &work_list);
	spin_unlock(&obd->obd_dev_lock);

	if (!list_empty(&work_list)) {
		CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
		       obd->obd_minor, obd);
		class_disconnect_export_list(&work_list,
					     exp_flags_from_obd(obd));
	} else
		CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
		       obd->obd_minor, obd);
	EXIT;
}
EXPORT_SYMBOL(class_disconnect_exports);

/* Remove exports that have not completed recovery.
 */
void class_disconnect_stale_exports(struct obd_device *obd,
				    int (*test_export)(struct obd_export *))
{
	LIST_HEAD(work_list);
	struct obd_export *exp, *n;
	int evicted = 0;

	ENTRY;

	spin_lock(&obd->obd_dev_lock);
	list_for_each_entry_safe(exp, n, &obd->obd_exports,
				 exp_obd_chain) {
		/* don't count self-export as client */
		if (obd_uuid_equals(&exp->exp_client_uuid,
				    &exp->exp_obd->obd_uuid))
			continue;

		/* don't evict clients which have no slot in last_rcvd
		 * (e.g. lightweight connection)
		 */
		if (exp->exp_target_data.ted_lr_idx == -1)
			continue;

		spin_lock(&exp->exp_lock);
		if (exp->exp_failed || test_export(exp)) {
			spin_unlock(&exp->exp_lock);
			continue;
		}
		exp->exp_failed = 1;
		atomic_inc(&exp->exp_obd->obd_eviction_count);
		spin_unlock(&exp->exp_lock);

		list_move(&exp->exp_obd_chain, &work_list);
		evicted++;
		CWARN("%s: disconnect stale client %s@%s\n",
		      obd->obd_name, exp->exp_client_uuid.uuid,
		      obd_export_nid2str(exp));
		print_export_data(exp, "EVICTING", 0, D_HA);
	}
	spin_unlock(&obd->obd_dev_lock);

	if (evicted)
		LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
			      obd->obd_name, evicted);

	class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
						 OBD_OPT_ABORT_RECOV);
	EXIT;
}
EXPORT_SYMBOL(class_disconnect_stale_exports);

void class_fail_export(struct obd_export *exp)
{
	int rc, already_failed;

	spin_lock(&exp->exp_lock);
	already_failed = exp->exp_failed;
	exp->exp_failed = 1;
	spin_unlock(&exp->exp_lock);

	if (already_failed) {
		CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
		       exp, exp->exp_client_uuid.uuid);
		return;
	}

	atomic_inc(&exp->exp_obd->obd_eviction_count);

	CDEBUG(D_HA, "disconnecting export %p/%s\n",
	       exp, exp->exp_client_uuid.uuid);

	if (obd_dump_on_timeout)
		libcfs_debug_dumplog();

	/* need for safe call CDEBUG after obd_disconnect */
	class_export_get(exp);

	/* Callers into obd_disconnect are removing their own ref(eg request) in
	 * addition to one from hash table. We don't have such a ref so make one
	 */
	class_export_get(exp);
	rc = obd_disconnect(exp);
	if (rc)
		CERROR("disconnecting export %p failed: %d\n", exp, rc);
	else
		CDEBUG(D_HA, "disconnected export %p/%s\n",
		       exp, exp->exp_client_uuid.uuid);
	class_export_put(exp);
}
EXPORT_SYMBOL(class_fail_export);

#ifdef HAVE_SERVER_SUPPORT

static int take_first(struct obd_export *exp, void *data)
{
	struct obd_export **expp = data;

	if (*expp)
		/* already have one */
		return 0;
	if (exp->exp_failed)
		/* Don't want this one */
		return 0;
	if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
		/* Cannot get a ref on this one */
		return 0;
	*expp = exp;
	return 1;
}

int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
{
	struct lnet_nid nid_key;
	struct obd_export *doomed_exp;
	int exports_evicted = 0;
	struct lu_env *env = NULL, _env;

	libcfs_strnid(&nid_key, nid);

	spin_lock(&obd->obd_dev_lock);
	/* umount already run. evict thread should stop leaving unmount thread
	 * to take over
	 */
	if (obd->obd_stopping) {
		spin_unlock(&obd->obd_dev_lock);
		return 0;
	}
	spin_unlock(&obd->obd_dev_lock);

	/* can be called via procfs and from ptlrpc */
	env = lu_env_find();
	if (env == NULL) {
		int rc = lu_env_init(&_env, LCT_DT_THREAD | LCT_MD_THREAD);

		if (rc)
			return rc;
		env = &_env;
		rc = lu_env_add(env);
		if (unlikely(rc))
			GOTO(out_fini, exports_evicted = rc);
	}

	doomed_exp = NULL;
	while (obd_nid_export_for_each(obd, &nid_key,
				       take_first, &doomed_exp) > 0) {

		LASSERTF(doomed_exp != obd->obd_self_export,
			 "self-export is hashed by NID?\n");

		LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
			      obd->obd_name,
			      obd_uuid2str(&doomed_exp->exp_client_uuid),
			      obd_export_nid2str(doomed_exp));

		class_fail_export(doomed_exp);
		class_export_put(doomed_exp);
		exports_evicted++;
		doomed_exp = NULL;
	}

	if (!exports_evicted)
		CDEBUG(D_HA,
		       "%s: can't disconnect NID '%s': no exports found\n",
		       obd->obd_name, nid);

	if (env == &_env) {
		lu_env_remove(&_env);
out_fini:
		lu_env_fini(&_env);
	}

	return exports_evicted;
}
EXPORT_SYMBOL(obd_export_evict_by_nid);

int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
{
	struct obd_export *doomed_exp = NULL;
	struct obd_uuid doomed_uuid;
	struct lu_env env;
	int rc = 0;

	spin_lock(&obd->obd_dev_lock);
	if (obd->obd_stopping) {
		spin_unlock(&obd->obd_dev_lock);
		return 0;
	}
	spin_unlock(&obd->obd_dev_lock);

	obd_str2uuid(&doomed_uuid, uuid);
	if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
		CERROR("%s: can't evict myself\n", obd->obd_name);
		return 0;
	}

	rc = lu_env_init(&env, LCT_DT_THREAD | LCT_MD_THREAD);
	if (rc)
		return rc;
	rc = lu_env_add(&env);
	if (unlikely(rc))
		goto out_fini;

	doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);

	if (doomed_exp == NULL) {
		CERROR("%s: can't disconnect %s: no exports found\n",
		       obd->obd_name, uuid);
	} else {
		CWARN("%s: evicting %s at adminstrative request\n",
		       obd->obd_name, doomed_exp->exp_client_uuid.uuid);
		class_fail_export(doomed_exp);
		class_export_put(doomed_exp);
		obd_uuid_del(obd, doomed_exp);
		rc = 1;
	}

	lu_env_remove(&env);
out_fini:
	lu_env_fini(&env);

	return rc;
}
#endif /* HAVE_SERVER_SUPPORT */

#if LUSTRE_TRACKS_LOCK_EXP_REFS
void (*class_export_dump_hook)(struct obd_export *) = NULL;
EXPORT_SYMBOL(class_export_dump_hook);
#endif

static void print_export_data(struct obd_export *exp, const char *status,
			      int locks, int debug_level)
{
	struct ptlrpc_reply_state *rs;
	struct ptlrpc_reply_state *first_reply = NULL;
	int nreplies = 0;

	spin_lock(&exp->exp_lock);
	list_for_each_entry(rs, &exp->exp_outstanding_replies,
			    rs_exp_list) {
		if (nreplies == 0)
			first_reply = rs;
		nreplies++;
	}
	spin_unlock(&exp->exp_lock);

	CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu stale:%d\n",
	       exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
	       obd_export_nid2str(exp),
	       refcount_read(&exp->exp_handle.h_ref),
	       atomic_read(&exp->exp_rpc_count),
	       atomic_read(&exp->exp_cb_count),
	       atomic_read(&exp->exp_locks_count),
	       exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
	       nreplies, first_reply, nreplies > 3 ? "..." : "",
	       exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
#if LUSTRE_TRACKS_LOCK_EXP_REFS
	if (locks && class_export_dump_hook != NULL)
		class_export_dump_hook(exp);
#endif
}

void dump_exports(struct obd_device *obd, int locks, int debug_level)
{
	struct obd_export *exp;

	spin_lock(&obd->obd_dev_lock);
	list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
		print_export_data(exp, "ACTIVE", locks, debug_level);
	list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
		print_export_data(exp, "UNLINKED", locks, debug_level);
	list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
		print_export_data(exp, "DELAYED", locks, debug_level);
	spin_unlock(&obd->obd_dev_lock);
}

void obd_exports_barrier(struct obd_device *obd)
{
	int waited = 2;

	LASSERT(list_empty(&obd->obd_exports));
	spin_lock(&obd->obd_dev_lock);
	while (!list_empty(&obd->obd_unlinked_exports)) {
		spin_unlock(&obd->obd_dev_lock);
		schedule_timeout_uninterruptible(cfs_time_seconds(waited));
		if (waited > 5 && is_power_of_2(waited)) {
			LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
				      obd->obd_name, waited,
				      kref_read(&obd->obd_refcount));
			dump_exports(obd, 1, D_CONSOLE | D_WARNING);
		}
		waited *= 2;
		spin_lock(&obd->obd_dev_lock);
	}
	spin_unlock(&obd->obd_dev_lock);
}
EXPORT_SYMBOL(obd_exports_barrier);

/* Add export to the obd_zombe thread and notify it. */
static void obd_zombie_export_add(struct obd_export *exp)
{
	atomic_inc(&obd_stale_export_num);
	spin_lock(&exp->exp_obd->obd_dev_lock);
	LASSERT(!list_empty(&exp->exp_obd_chain));
	list_del_init(&exp->exp_obd_chain);
	spin_unlock(&exp->exp_obd->obd_dev_lock);
	queue_work(zombie_wq, &exp->exp_zombie_work);
}

/* Add import to the obd_zombe thread and notify it. */
static void obd_zombie_import_add(struct obd_import *imp)
{
	LASSERT(imp->imp_sec == NULL);

	queue_work(zombie_wq, &imp->imp_zombie_work);
}

/* wait when obd_zombie import/export queues become empty */
void obd_zombie_barrier(void)
{
	wait_var_event(&obd_stale_export_num,
			atomic_read(&obd_stale_export_num) == 0);
	flush_workqueue(zombie_wq);
}
EXPORT_SYMBOL(obd_zombie_barrier);


struct obd_export *obd_stale_export_get(void)
{
	struct obd_export *exp = NULL;

	ENTRY;

	spin_lock(&obd_stale_export_lock);
	if (!list_empty(&obd_stale_exports)) {
		exp = list_first_entry(&obd_stale_exports,
				       struct obd_export, exp_stale_list);
		list_del_init(&exp->exp_stale_list);
	}
	spin_unlock(&obd_stale_export_lock);

	if (exp) {
		CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
		       atomic_read(&obd_stale_export_num));
	}
	RETURN(exp);
}
EXPORT_SYMBOL(obd_stale_export_get);

void obd_stale_export_put(struct obd_export *exp)
{
	ENTRY;

	LASSERT(list_empty(&exp->exp_stale_list));
	if (exp->exp_lock_hash &&
	    atomic_read(&exp->exp_lock_hash->hs_count)) {
		CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
		       atomic_read(&obd_stale_export_num));

		spin_lock_bh(&exp->exp_bl_list_lock);
		spin_lock(&obd_stale_export_lock);
		/* Add to the tail if there is no blocked locks,
		 * to the head otherwise.
		 */
		if (list_empty(&exp->exp_bl_list))
			list_add_tail(&exp->exp_stale_list,
				      &obd_stale_exports);
		else
			list_add(&exp->exp_stale_list,
				 &obd_stale_exports);

		spin_unlock(&obd_stale_export_lock);
		spin_unlock_bh(&exp->exp_bl_list_lock);
	} else {
		class_export_put(exp);
	}
	EXIT;
}
EXPORT_SYMBOL(obd_stale_export_put);

/**
 * Adjust the position of the export in the stale list,
 * i.e. move to the head of the list if is needed.
 **/
void obd_stale_export_adjust(struct obd_export *exp)
{
	LASSERT(exp != NULL);
	spin_lock_bh(&exp->exp_bl_list_lock);
	spin_lock(&obd_stale_export_lock);

	if (!list_empty(&exp->exp_stale_list) &&
	    !list_empty(&exp->exp_bl_list))
		list_move(&exp->exp_stale_list, &obd_stale_exports);

	spin_unlock(&obd_stale_export_lock);
	spin_unlock_bh(&exp->exp_bl_list_lock);
}
EXPORT_SYMBOL(obd_stale_export_adjust);

/* start destroy zombie import/export thread */
int obd_zombie_impexp_init(void)
{
	zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
					   0, CFS_CPT_ANY,
					   cfs_cpt_number(cfs_cpt_tab));

	return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
}

/* stop destroy zombie import/export thread */
void obd_zombie_impexp_stop(void)
{
	destroy_workqueue(zombie_wq);
	LASSERT(list_empty(&obd_stale_exports));
}

/***** Kernel-userspace comm helpers *******/

/* Get length of entire message, including header */
int kuc_len(int payload_len)
{
	return sizeof(struct kuc_hdr) + payload_len;
}
EXPORT_SYMBOL(kuc_len);

/* Get a pointer to kuc header, given a ptr to the payload
 * @param p Pointer to payload area
 * @returns Pointer to kuc header
 */
struct kuc_hdr *kuc_ptr(void *p)
{
	struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;

	LASSERT(lh->kuc_magic == KUC_MAGIC);
	return lh;
}
EXPORT_SYMBOL(kuc_ptr);

/* Alloc space for a message, and fill in header
 * @return Pointer to payload area
 */
void *kuc_alloc(int payload_len, int transport, int type)
{
	struct kuc_hdr *lh;
	int len = kuc_len(payload_len);

	OBD_ALLOC(lh, len);
	if (lh == NULL)
		return ERR_PTR(-ENOMEM);

	lh->kuc_magic = KUC_MAGIC;
	lh->kuc_transport = transport;
	lh->kuc_msgtype = type;
	lh->kuc_msglen = len;

	return (void *)(lh + 1);
}
EXPORT_SYMBOL(kuc_alloc);

/* Takes pointer to payload area */
void kuc_free(void *p, int payload_len)
{
	struct kuc_hdr *lh = kuc_ptr(p);

	OBD_FREE(lh, kuc_len(payload_len));
}
EXPORT_SYMBOL(kuc_free);

struct obd_request_slot_waiter {
	struct list_head	orsw_entry;
	wait_queue_head_t	orsw_waitq;
	bool			orsw_signaled;
};

static bool obd_request_slot_avail(struct client_obd *cli,
				   struct obd_request_slot_waiter *orsw)
{
	bool avail;

	spin_lock(&cli->cl_loi_list_lock);
	avail = !!list_empty(&orsw->orsw_entry);
	spin_unlock(&cli->cl_loi_list_lock);

	return avail;
};

/*
 * For network flow control, the RPC sponsor needs to acquire a credit
 * before sending the RPC. The credits count for a connection is defined
 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
 * the subsequent RPC sponsors need to wait until others released their
 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
 */
int obd_get_request_slot(struct client_obd *cli)
{
	struct obd_request_slot_waiter	 orsw;
	int				 rc;

	spin_lock(&cli->cl_loi_list_lock);
	if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
		cli->cl_rpcs_in_flight++;
		spin_unlock(&cli->cl_loi_list_lock);
		return 0;
	}

	init_waitqueue_head(&orsw.orsw_waitq);
	list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
	orsw.orsw_signaled = false;
	spin_unlock(&cli->cl_loi_list_lock);

	rc = l_wait_event_abortable(orsw.orsw_waitq,
				    obd_request_slot_avail(cli, &orsw) ||
				    orsw.orsw_signaled);

	/* Here, we must take the lock to avoid the on-stack 'orsw' to be
	 * freed but other (such as obd_put_request_slot) is using it.
	 */
	spin_lock(&cli->cl_loi_list_lock);
	if (rc != 0) {
		if (!orsw.orsw_signaled) {
			if (list_empty(&orsw.orsw_entry))
				cli->cl_rpcs_in_flight--;
			else
				list_del(&orsw.orsw_entry);
		}
		rc = -EINTR;
	}

	if (orsw.orsw_signaled) {
		LASSERT(list_empty(&orsw.orsw_entry));

		rc = -EINTR;
	}
	spin_unlock(&cli->cl_loi_list_lock);

	return rc;
}
EXPORT_SYMBOL(obd_get_request_slot);

void obd_put_request_slot(struct client_obd *cli)
{
	struct obd_request_slot_waiter *orsw;

	spin_lock(&cli->cl_loi_list_lock);
	cli->cl_rpcs_in_flight--;

	/* If there is free slot, wakeup the first waiter. */
	if (!list_empty(&cli->cl_flight_waiters) &&
	    likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
		orsw = list_first_entry(&cli->cl_flight_waiters,
					struct obd_request_slot_waiter,
					orsw_entry);
		list_del_init(&orsw->orsw_entry);
		cli->cl_rpcs_in_flight++;
		wake_up(&orsw->orsw_waitq);
	}
	spin_unlock(&cli->cl_loi_list_lock);
}
EXPORT_SYMBOL(obd_put_request_slot);

__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
{
	return cli->cl_max_rpcs_in_flight;
}
EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);

int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
{
	struct obd_request_slot_waiter *orsw;
	__u32				old;
	int				diff;
	int				i;
	int				rc;

	if (max > OBD_MAX_RIF_MAX || max < 1)
		return -ERANGE;

	CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
	       cli->cl_import->imp_obd->obd_name, max,
	       cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);

	if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
		   LUSTRE_MDC_NAME) == 0) {
		/* adjust max_mod_rpcs_in_flight to ensure it is always
		 * strictly lower that max_rpcs_in_flight
		 */
		if (max < 2) {
			CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
			       cli->cl_import->imp_obd->obd_name);
			return -ERANGE;
		}
		if (max <= cli->cl_max_mod_rpcs_in_flight) {
			rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
			if (rc != 0)
				return rc;
		}
	}

	spin_lock(&cli->cl_loi_list_lock);
	old = cli->cl_max_rpcs_in_flight;
	cli->cl_max_rpcs_in_flight = max;
	client_adjust_max_dirty(cli);

	diff = max - old;

	/* We increase the max_rpcs_in_flight, then wakeup some waiters. */
	for (i = 0; i < diff; i++) {
		orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
						struct obd_request_slot_waiter,
						orsw_entry);
		if (!orsw)
			break;

		list_del_init(&orsw->orsw_entry);
		cli->cl_rpcs_in_flight++;
		wake_up(&orsw->orsw_waitq);
	}
	spin_unlock(&cli->cl_loi_list_lock);

	return 0;
}
EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);

__u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
{
	return cli->cl_max_mod_rpcs_in_flight;
}
EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);

int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
{
	struct obd_connect_data *ocd;
	__u16 maxmodrpcs;
	__u16 prev;

	if (max > OBD_MAX_RIF_MAX || max < 1)
		return -ERANGE;

	ocd = &cli->cl_import->imp_connect_data;
	CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
	       cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
	       ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);

	if (max == OBD_MAX_RIF_MAX)
		max = OBD_MAX_RIF_MAX - 1;

	/* Cannot exceed or equal max_rpcs_in_flight.  If we are asked to
	 * increase this value, also bump up max_rpcs_in_flight to match.
	 */
	if (max >= cli->cl_max_rpcs_in_flight) {
		CDEBUG(D_INFO,
		       "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
		       cli->cl_import->imp_obd->obd_name, max + 1, max);
		obd_set_max_rpcs_in_flight(cli, max + 1);
	}

	/* cannot exceed max modify RPCs in flight supported by the server,
	 * but verify ocd_connect_flags is at least initialized first.  If
	 * not, allow it and fix value later in ptlrpc_connect_set_flags().
	 */
	if (!ocd->ocd_connect_flags) {
		maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
	} else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
		maxmodrpcs = ocd->ocd_maxmodrpcs;
		if (maxmodrpcs == 0) { /* connection not finished yet */
			maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
			CDEBUG(D_INFO,
			       "%s: partial connect, assume maxmodrpcs=%hu\n",
			       cli->cl_import->imp_obd->obd_name, maxmodrpcs);
		}
	} else {
		maxmodrpcs = 1;
	}
	if (max > maxmodrpcs) {
		CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
		       cli->cl_import->imp_obd->obd_name,
		       max, maxmodrpcs);
		return -ERANGE;
	}

	spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);

	prev = cli->cl_max_mod_rpcs_in_flight;
	cli->cl_max_mod_rpcs_in_flight = max;

	/* wakeup waiters if limit has been increased */
	if (cli->cl_max_mod_rpcs_in_flight > prev)
		wake_up_locked(&cli->cl_mod_rpcs_waitq);

	spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);

	return 0;
}
EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);

int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
			       struct seq_file *seq)
{
	unsigned long mod_tot = 0, mod_cum;
	int i;

	spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
	lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
			     ":", true, "");
	seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
		   cli->cl_mod_rpcs_in_flight);

	seq_puts(seq, "\n\t\t\tmodify\n");
	seq_puts(seq, "rpcs in flight        rpcs   %% cum %%\n");

	mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);

	mod_cum = 0;
	for (i = 0; i < OBD_HIST_MAX; i++) {
		unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];

		mod_cum += mod;
		seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
			   i, mod, pct(mod, mod_tot),
			   pct(mod_cum, mod_tot));
		if (mod_cum == mod_tot)
			break;
	}

	spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);

	return 0;
}
EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);

/* The number of modify RPCs sent in parallel is limited
 * because the server has a finite number of slots per client to
 * store request result and ensure reply reconstruction when needed.
 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
 * that takes into account server limit and cl_max_rpcs_in_flight
 * value.
 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
 * one close request is allowed above the maximum.
 */
struct mod_waiter {
	struct client_obd *cli;
	bool close_req;
	bool woken;
	wait_queue_entry_t wqe;
};
static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
				  unsigned int mode, int flags, void *key)
{
	struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
	struct client_obd *cli = w->cli;
	bool close_req = w->close_req;
	bool avail;
	int ret;

	/* As woken_wake_function() doesn't remove us from the wait_queue,
	 * we use own flag to ensure we're called just once.
	 */
	if (w->woken)
		return 0;

	/* A slot is available if
	 * - number of modify RPCs in flight is less than the max
	 * - it's a close RPC and no other close request is in flight
	 */
	avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
		(close_req && cli->cl_close_rpcs_in_flight == 0);
	if (avail) {
		cli->cl_mod_rpcs_in_flight++;
		if (close_req)
			cli->cl_close_rpcs_in_flight++;
		ret = woken_wake_function(wq_entry, mode, flags, key);
		w->woken = true;
	} else if (cli->cl_close_rpcs_in_flight)
		/* No other waiter could be woken */
		ret = -1;
	else if (!key)
		/* This was not a wakeup from a close completion or a new close
		 * being queued, so there is no point seeing if there are close
		 * waiters to be woken.
		 */
		ret = -1;
	else
		/* There might be be a close we could wake, keep looking */
		ret = 0;
	return ret;
}

/* Get a modify RPC slot from the obd client @cli according
 * to the kind of operation @opc that is going to be sent
 * and the intent @it of the operation if it applies.
 * If the maximum number of modify RPCs in flight is reached
 * the thread is put to sleep.
 * Returns the tag to be set in the request message. Tag 0
 * is reserved for non-modifying requests.
 */
__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
{
	struct mod_waiter wait = {
		.cli = cli,
		.close_req = (opc == MDS_CLOSE),
		.woken = false,
	};
	__u16			i, max;

	init_wait(&wait.wqe);
	wait.wqe.func = claim_mod_rpc_function;
	spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
	/* If it's kthread and don't have set_child_tid */
	if ((current->flags & PF_KTHREAD) && (current->flags & PF_MEMALLOC) &&
	    !current->set_child_tid) {
		/* Skip wait_woken as it will cause kernel panic (LU-18826).
		 * Also confirm it's on the mem alloc path by PF_MEMALLOC.
		 * In this dedicated case, grant a slot.
		 */
		cli->cl_mod_rpcs_in_flight++;
		if (wait.close_req)
			cli->cl_close_rpcs_in_flight++;
		LCONSOLE_INFO("%s: Force grant RPC slot (%u current) to proc with flag: %x.\n",
			cli->cl_import->imp_obd->obd_name,
			cli->cl_mod_rpcs_in_flight, current->flags);
	} else {
		__add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
		/* This wakeup will only succeed if the maximums haven't
		 * been reached.  If that happens, wait.woken will be set
		 * and there will be no need to wait.
		 * If a close_req was enqueue, ensure we search all the way to
		 * the end of the waitqueue for a close request.
		 */
		__wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
				     (void *)wait.close_req);

		while (wait.woken == false) {
			spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
			wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
				MAX_SCHEDULE_TIMEOUT);
			spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
		}
		__remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
	}
	/* In extreme situation like (LU-18826), cl_mod_rpcs_in_flight
	 * can go above cl_max_mod_rpcs_in_flight, use greater value here
	 * to make sure the slot can be found in cl_mod_tag_bitmap
	 */
	max = max(cli->cl_max_mod_rpcs_in_flight, cli->cl_mod_rpcs_in_flight);
	lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
			 cli->cl_mod_rpcs_in_flight);
	/* find a free tag */
	i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
				max + 1);
	LASSERT(i < OBD_MAX_RIF_MAX);
	LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
	spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
	/* tag 0 is reserved for non-modify RPCs */

	CDEBUG(D_RPCTRACE,
	       "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
	       cli->cl_import->imp_obd->obd_name,
	       i + 1, opc, max);

	return i + 1;
}
EXPORT_SYMBOL(obd_get_mod_rpc_slot);

/* Put a modify RPC slot from the obd client @cli according
 * to the kind of operation @opc that has been sent.
 */
void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
{
	bool			close_req = false;

	if (tag == 0)
		return;

	if (opc == MDS_CLOSE)
		close_req = true;

	spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
	cli->cl_mod_rpcs_in_flight--;
	if (close_req)
		cli->cl_close_rpcs_in_flight--;
	/* release the tag in the bitmap */
	LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
	LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
	__wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
			     (void *)close_req);
	spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
}
EXPORT_SYMBOL(obd_put_mod_rpc_slot);
